// Copyright (c) 2011, Object Computing, Inc.
// All rights reserved.
// See the file license.txt for licensing information.
//
////////////////////////////
/// UNDER CONSTRUCTION!!!
/// NOT READY FOR USE
////////////////////////////
#ifdef _MSC_VER
# pragma once
#endif
#ifndef ATOMICSINGLESERVERBUFFERQUEUE_H
#define ATOMICSINGLESERVERBUFFERQUEUE_H
// All inline, do not export.
//#include <Common/QuickFAST_Export.h>
#include "AtomicQueue_fwd.h"
#include <Common/AtomicPointer.h>
namespace QuickFAST
{
  namespace Communication
  {

    ///@brief Keep a queue of buffers waiting to be serviced by one server thread.
    ///
    /// Buffers may be generated by multiple threads, but must be serviced by a single
    /// thread (at any one time.)
    ///
    /// A thread with a buffer that needs to be processed locks a mutex and calls push()
    /// If push returns true that thread is now responsible for servicing the queue.
    /// To service the queue,
    ///    release the mutex (only one thread services at a time
    ///                        so no synchronization is needed)
    ///    for(bool more = startService();
    ///        more;
    ///        {lock} more = endservice(){unlock})
    ///    {
    ///       for buffer returned by serviceNext()
    ///         process buffer
    ///    }
    ///
    /// Synchronization is by an external mutex, to allow other data items to be synchronized by the
    /// same mutex.  Placeholder lock arguments help enforce the synchronization rules.  These
    /// locks are not actually used here.
    ///
    /// Note that the queue can be serviced without synchronization since only one thread will be
    /// doing the work.
    ///
    /// This object does not manage buffer lifetimes.  It assumes
    /// that buffers outlive the collection.
    class AtomicSingleServerBufferQueue
    {
      static const long BUSY = 1;
      static const long IDLE = 0;

    public:
      /// @brief Construct an empty queue.
      AtomicSingleServerBufferQueue()
        : incomingHead_(0)
        , busy_(IDLE)
      {
      }

      /// @brief Push a buffer onto the queue.
      ///
      /// @param buffer is the buffer to be added to the queue
      /// @returns true if if the queue needs to be serviced
      bool push(LinkedBuffer * buffer, boost::mutex::scoped_lock &)
      {
        bool first = false;
        bool ok = false;
        while(!ok)
        {
          buffer->link(incomingHead_);
          first = buffer->link() == 0;
          incomingHead_.CAS(buffer->link(), buffer);
        }
        bool wasIdle = CASLong(&busy_, IDLE, BUSY);
        if(!wasIdle && first)
        {
          condition_.notify_one();
        }
        return wasIdle;
      }

      /// @brief promote buffers from the incoming to the outgoing queue
      void promote()
      {
        assert(busy_);
        assert(outgoing_.isEmpty());
        bool ok = false;
        LinkedBuffer * buffers = 0;
        while(!ok)
        {
          buffers = incomingHead_;
          ok = incomingHead_.CAS(buffers, 0);
        }
        while(buffers != 0)
        {
          LinkedBuffer * next = buffers->link();
          outgoing_.push_front(buffers);
          buffers = next;
        }
      }

      /// @brief Prepare to service this queue
      ///
      /// All buffers collected so far will be set aside to be serviced
      /// by this thread.  Any buffers arriving after this call will be
      /// held for later.
      ///
      /// If this method returns true, the calling thread MUST completely
      /// service the queue.
      ///
      /// @returns true if if the queue is now ready to be serviced
      bool startService(boost::mutex::scoped_lock &)
      {
        assert(busy_);
        promote();
        bool busy = true;
        if(outgoing_.isEmpty())
        {
          busy = false;
          CASLong(&busy_, BUSY, IDLE);
        }
        return busy;
      }

      /// @brief Service the next entry
      ///
      /// No locking is required because the queue should be serviced
      /// by a single thread (the one that set busy_ to true).
      /// @returns the entry to be processed or zero if this batch of entries is finished.
      LinkedBuffer * serviceNext()
      {
        assert(busy_);
        return outgoing_.pop();
      }

      /// @brief Service all pending entries at once
      /// @returns the first entry in a linked list of buffers.
      LinkedBuffer * serviceAll()
      {
        assert(busy_);
        return outgoing_.popList();
      }

      /// @brief Relinquish responsibility for servicing the queue
      ///
      /// Unless recheck is false, this call will prepare a new batch of buffers
      /// to be processed assuming any arrived while the previous batch was being
      /// serviced.  When there are more buffers, this thread must continue to
      /// process them.
      ///
      /// @param recheck should normally be true indicating that this thread is
      ///        willing to continue servicing the queue.
      /// @returns true if there are more entries to be serviced.
      bool endService(bool recheck, boost::mutex::scoped_lock &)
      {
        assert(busy_);
        if(recheck)
        {
          promote();
        }
        if(!outgoing_.isEmpty())
        {
          return true;
        }
        CASLong(&busy_, BUSY, IDLE);
        return false;
      }


      /// @brief Promote buffers from incoming to outgoing (service thread only)
      ///
      /// This method should be called only by the service thread
      /// Any accumulated buffers in the input queue will be appended to the output queue.
      /// If wait is false, then the return value is false if nothing was changed.
      /// if wait is true, then this call waits until some incoming buffers are available.
      /// The (external) mutex must be locked when this method is called (even if wait is false).
      ///
      /// param unused scoped_lock is for legacy compatibility
      /// @param wait is true if this call should wait for incoming buffers to be available.
      /// Returns true if the incoming buffer queue has changed from empty to populated
      bool refresh(boost::mutex::scoped_lock &, bool wait)
      {
        assert(busy_);
        if(!outgoing_.isEmpty())
        {
          return true;
        }
        promote();
        if(!outgoing_.isEmpty())
        {
          return true;
        }

        while(wait && incomingHead_ == 0)
        {
          {
            boost::mutex::scoped_lock lock(waitMutex_);
            condition_.wait(lock);
          }
          promote();
          if(!outgoing_.isEmpty())
          {
            return true;
          }
        }
        return false;
      }

      /// @brief A nondestructive peek at the outgoing queue.
      const LinkedBuffer * peekOutgoing()const
      {
        return outgoing_.begin();
      }

    private:
      AtomicPointer<LinkedBuffer> incomingHead_;
      BufferQueue outgoing_;
      boost::mutex waitMutex_;
      boost::condition_variable condition_;
      volatile long busy_;
    };
  }
}
#endif // ATOMICSINGLESERVERBUFFERQUEUE_H
